
如題,當初會寫這篇參考筆記,主要是因為,當時正在解【Spark】Spark基础练习题(二)這篇文章上的題目時,碰到了(如上圖)這題,需要使用 spark 把資料寫入 Mysql 中,再讀取出來,在順便把搜尋到相關的教學資料,統整成一篇筆記,並補上一些圖片方便閱讀XD。
特此撰寫本篇文章作為紀錄文件,用以方便後續有需要的時候,可以快速的重複查閱,雖然後面比較沒有什麼機會再用到,但也算是一個還不錯的經驗。
JAVA_HOME = C:\Java\jre1.8.0_241

pip3 install pyspark
HADOOP_HOME = C:\winutils
接著還會遇到暫存資料夾 tmp\hive 權限的問題
10. 在你要開啟專案的硬碟的根目錄 (如 E:) 建立資料夾 \tmp\hive
(假如 ipynb 在 E:\abc\def\code.ipynb 內,就建立兩個資料夾在 E:\tmp\hive,一般此資料夾會自動在執行 pyspark 時建立,但權限會有問題,需手動修改)
11. 用 winutils.exe 改變該暫存資料夾的權限
%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive
應該要為 drwxrwxrwx
這樣應該就可以正常使用 pyspark 了
pyspark連接Mysql是通過java實現的,所以需要下載連接Mysql的jar包。
下載地址

選擇下載Connector/J,然後選擇操作系統為Platform Independent,下載壓縮包到本地。
然後因為是直接通過 pip3 install pyspark 的方式安裝 PySpark,可以先透過以下程式碼查詢 PySpark路徑:(參考:PySpark 連線 MySQL 示例)
from pyspark import find_spark_home
print(find_spark_home._find_spark_home())
然後解壓文件,將其中的jar包mysql-connector-java-8.0.22.jar放入spark的安裝目錄下的jars資料夾
下方程式碼參考:PySpark 連接 MySQL 示例的Spark 代碼示例
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
    
    # 讀取表
    data = spark.read.jdbc(url=url, table='user', properties=prop)
    # 打印data數據類型
    print(type(data))
    # 展示數據
    data.show()
    # 關閉spark會話
    sc.stop()
注意點:
輸出畫面如下:
參考:使用JDBC連接MySql時出現...
在連接字符串後面加上?serverTimezone=UTC
其中UTC是統一標準世界時間。
完整的連接字符串示例:jdbc:mysql://localhost:3306/test?serverTimezone=UTC
或者還有另一種選擇:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8,這個是解決中文亂碼輸入問題,當然也可以和上面的一起結合:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
下方程式碼參考:pyspark對Mysql數據庫進行讀寫
的4 寫入Mysql
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
    # 創建spark DataFrame
    # 方式1:list轉spark DataFrame
    l = [(1, 12), (2, 22)]
    # 創建並指定列名
    list_df = spark.createDataFrame(l, schema=['id', 'value']) 
    
    # 方式2:rdd轉spark DataFrame
    rdd = sc.parallelize(l)  # rdd
    col_names = Row('id', 'value')  # 列名
    tmp = rdd.map(lambda x: col_names(*x))  # 設置列名
    rdd_df = spark.createDataFrame(tmp)  
    
    # 方式3:pandas dataFrame 轉spark DataFrame
    df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
    pd_df = spark.createDataFrame(df)
    # 寫入數據庫
    pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
    # 關閉spark會話
    sc.stop()
效果如下:
注意點:

當數據庫無寫入的表時,這四種模式都會根據設定的表名稱自動創建表,無需在Mysql裡先建表。